Logstash + Snowflakeを活用してTwitterエゴサーチ分析基盤を作る
こんにちは、データアナリティクス事業本部・新納(にいの)です。突然ですが皆さんはTwitterでエゴサーチ(自分の名前の検索)していますか?私はしていません。
エゴサーチとは無縁の暮らしをしていた私ですが、去る2021年2月1日、クラスメソッドが「Zenn」を買収したプレスリリースを発表したことで状況が変わります。
このニュースがTwitterでも話題となり、一時は「Zenn」「クラスメソッド」がトレンドにランクインしました。社員の中には「世間ではどんな反応をされているのかな...」とドキドキしながら検索ウィンドウに社名を入れた者もいるでしょう。私です。
社長や取締役を昼夜問わずタイムラインで見かけるくらいなので業務時間中にTwitterをして怒られるような会社ではありませんが、せっかくなので自動で継続的にTwitterをエゴサーチして分析してくれる環境を作ってみました。
こんな分析基盤を作りました
完成図はこちら。
- EC2インスタンス上で実行しているLogstashでツイート取得・S3バケットへのアウトプットを行う
- ツイート(JSON形式)をSnowflake上のデータベースに連携する
- データパイプライン・SnowpipeでS3からSnowflake上のデータベースへ連携
- ストリーム(Snowflakeの機能)でレコードに対する差分を検知
- タスク(Snowflakeの機能)でJSONをパースして別テーブルに格納する処理を定期実行
- Alteryx・Tableauを使ってデータ加工・可視化を行う
この構成の良いところは以下の通り。
- ELT(Extract・Load・Transform)により、格納先のデータベース内でデータの変換処理を行える
- プログラムを書かなくてもLogstashがツイートの取得からS3への出力まで対応してくれる
- 半構造化データを強力にサポートするSnowflakeを使用することで、JSON形式のデータを簡単にテーブル定義できる
- データパイプラインの作成がSnowflake内で完結できる(Snowpipe、ストリーム、タスク)
前提条件
今回の検証で使用した環境は以下の通り。
Logstashを実行するAmazon EC2
- AMI: Amazon Linux 2 AMI (HVM), SSD Volume Type - ami-0992fc94ca0f1415a
- インスタンスタイプ:t2.xlarge
Snowflake
- Snowflake 5.3.1
その他
- Tableau Desktop 2020.4
- Alteryx Designer 2020.4
また、Twitter API利用のためにDeveloper Accountに申し込みを済ませる必要があります。
Logstashを実行する
まずはツイートを取得するための環境をAmazon EC2上に構築します。余談ですが、Windows10であればsshクライアントを使用しなくてもコマンドプロンプトでsshできることに今更ながら気が付きました。本当にありがとう。
Logstashのインストール
下記サイトの手順に従ってLogstashをインストールします。
公開署名キーをダウンロード・インストールします。
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
LogstashのリポジトリURLを追加するため、リポジトリのあるディレクトリへ移動します。
cd /etc/yum.repos.d/
リポジトリを作成します。
sudo touch logstash.repo
作成したリポジトリをviコマンドで編集します。
sudo vi logstash.repo
以下内容を追記し、保存します。
[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
Logstashをインストールします。
sudo yum install logstash
インストールが完了すると以下のように表示されます。
Installed: logstash.x86_64 1:7.10.2-1 Complete!
プラグインのインストール
Logstashではデータソースに応じてインプット・アウトプットに使用するプラグインが提供されています。使用可能なプラグインは以下の通り。
- Input plugins | Logstash Reference [7.10] | Elastic
- Output plugins | Logstash Reference [7.10] | Elastic
Twitter input plugin
インプットにはTwitter input Pluginを使用します。Twitter Streaming APIを使用しており、ストリーム形式でリアルタイムにツイートが取得できます。(条件に合致するツイートがあるとどんどんデータが生まれる)
sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-twitter
S3 output plugin
アウトプット先に合わせてS3 output pluginを使用します。
sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-s3
設定ファイルの作成
設定ファイルにはインプットやアウトプット先の情報を記述します。保存先のディレクトリは/etc/logstash/conf.d
で、ファイル名は任意のものに設定します。(今回はlogstash.confにしました)viコマンドで内容を編集します。
vi /etc/logstash/conf.d/logstash.conf
inputにはTwitterの、outputにはS3の認証情報を設定します。今回の検証で設定した項目は以下の通り。
項目 | 値 |
---|---|
consumer_key | TwitterのDeveloper Portalから取得できるConsumer Key(必須) |
consumer_secret | 同上のConsumer Secret(必須) |
oauth_token | TwitterのDeveloper Portalから取得できるAccess token(必須) |
oauth_token_secret | 同上のAccess secret(必須) |
keywords | 検索キーワード(オプション) |
full_tweet | 完全なツイートの取得。デフォルトはfalse (オプション) |
S3
アクセスキー、シークレットキーを指定しない場合はYAMLで記述した認証情報ファイルをaws_credentials_file
に指定する必要があります。
項目 | 値 |
---|---|
access_key_id | S3 PutObjectの権限を持つIAMユーザーのアクセスキー |
secret_access_key | 同上のシークレットキー |
region | S3バケットが配置されたリージョン(デフォルトはus-east-1 ) |
bucket | S3バケット名(必須) |
codec | エンコードしたいファイル形式 |
size_file | ファイルをローテーションするサイズ |
time_file | ファイルをローテーションする時間 |
設定内容の一例はこちら。
input { twitter { consumer_key => "" consumer_secret => "" oauth_token => "" oauth_token_secret => "" keywords => ["クラスメソッド","クラメソ"] full_tweet => true } } output { s3 { access_key_id => "" secret_access_key => "" region => "ap-northeast-1" bucket => "cm-niino-logstash" codec => "json_lines" size_file => 2048 time_file => 5 } }
Logstashの実行
やっとLogstashを実行できる環境が整いました。以下のコマンドを実行します。
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings "/etc/logstash" -f /etc/logstash/conf.d/logstash.conf
[logstash.agent ] Successfully started Logstash API endpoint
といったメッセージが表示されれば無事に実行されています。
うまくデータが生成されれば、ターゲットに指定したS3バケットに以下のような命名のファイルが生成されます。
ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt
Snowflakeでデータパイプラインを作る
クラウド型データウェアハウスであるSnowflakeにはデータパイプラインを実現する機能群が提供されています。つまり、Snowflake側でデータパイプラインを構築しておけば、勝手にデータの生成を検知してSnowflakeにデータを引っ張ってきてくれて、勝手に差分を検知して勝手にデータを加工して、勝手にテーブルに格納しておいてくれます。本当にありがとう。
以下のデータパイプラインを作成します。
- Snowpipe:連続データロード
- ストリーム:変更データの追跡
- タスク:定期的なタスク実行
以下のエントリの手順に沿って進めていきます。
Snowpipeを作成
S3バケットにデータが生成されると自動でSnowflake上のテーブルにデータを挿入してくれるSnowpipeを作成します。
Logstashのアウトプット先に指定しているS3バケットをSnowflakeの外部ステージとして設定します。GUI画面で作成する場合はデータベース→ステージから作成可能です。今回はLOGSTASHTEST
というステージを作成しました。
詳しい設定方法は以下エントリをご参照ください。
まず、ツイートのデータを格納しておくテーブルを作成します。JSON形式のデータを蓄積させておくため、型にはVARIANT
を指定しておきます。
CREATE TABLE "NIINO_TEST_PUBLIC"."TWITTER"."LOGSTASH_TWEETS"( record VARIANT );
パイプを作成します。先ほど設定したS3バケットの外部テーブルLOGSTASHTEST
から、今しがた作成したテーブルLOGSTASH_TEST
へデータをJSON形式で自動コピーするよう設定するSQL文です。
CREATE PIPE "NIINO_TEST_PUBLIC"."TWITTER".LOGSTASH_PIPE AUTO_INGEST=TRUE AS COPY INTO "NIINO_TEST_PUBLIC"."TWITTER"."LOGSTASH_TWEETS" FROM @LOGSTASHTEST FILE_FORMAT = (TYPE = 'JSON') ;
外部テーブルのS3バケットにイベント通知設定をします。今回の検証ではS3バケット上のファイルにあらゆる変更があった場合にAmazon SQSキューへイベント通知を送ります。
以下SQL文を実行し、ARN(Amazonリソースネーム)を取得します。notification_channel
に記載されたarn:aws:sqs
から始まる文字列をコピーしておきます。
SHOW PIPES;
AWSマネジメントコンソールに移動し、S3バケットの画面へ移動してイベント通知設定を行います。バケット→プロパティ→イベント通知から「イベント通知を作成」へ遷移します。
任意のイベント名を、特定のフォルダ内にデータが連携されるように設定している場合はプレフィックスにフォルダ名を入力します。
イベントタイプでは「すべてのオブジェクト作成イベント」を選択します。
送信先にSQSキューを選択し、「SQSキューARNを入力」を選択して先ほどコピーしたARNを入力します。
すでにS3バケットにデータが生成されている場合は以下のSQL文を実行します。ALTER PIPE REFRESH文を実行すると過去7日間にステージングされたデータファイルを取り込みます。7日以上前にステージングされている場合はCOPY INTOを実行する必要があります。
ALTER PIPE LOGSTASH_PIPE REFRESH;
データがテーブルに取り込まれていればSnowpipeの準備は完了です。
ストリームを作成
LOGSTASH_TWEETS
テーブルに差分があれば検出するようなストリームを以下のSQL文で作成します。
USE DATABASE "NIINO_TEST_PUBLIC"; USE SCHEMA TWITTER; CREATE OR REPLACE STREAM TWITTER_STREAM ON TABLE LOGSTASH_TWEETS ;
タスクを作成
LOGSTASH_TWEETS
テーブルに蓄積されたJSON形式のデータをパースして分析しやすい形に整形する処理を、タスクを作成することで定期的に実行させます。
今回はParse_logstash_tweets
という名前のタスクを作成し、JSON形式のデータをパースしてPARSED_TWEETS
テーブルにインサートする処理を1分ごとに実行します。
まずは格納させる先のPARSED_TWEETS
テーブルを作成しておきます。
USE DATABASE "NIINO_TEST_PUBLIC"; USE SCHEMA TWITTER; CREATE OR REPLACE TABLE PARSED_TWEETS ( timestamp_ms timestamp, retweet_count number, favorite_count number, quote_count number, reply_count number, hashtags varchar(280), text varchar(280), source varchar(280), user_screen_name varchar(280), user_name varchar(280), user_location varchar(280), user_description varchar(280) );
次にタスクを作成します。
USE DATABASE "NIINO_TEST_PUBLIC"; USE SCHEMA TWITTER; CREATE OR REPLACE TASK Parse_Logstash_tweets WAREHOUSE = X_SMALL_WH SCHEDULE = '1 minute' WHEN SYSTEM$STREAM_HAS_DATA('TWITTER_STREAM') AS INSERT INTO PARSED_TWEETS SELECT record:timestamp_ms::timestamp as timestamp_ms, record:retweet_count::number as retweet_count, record:favorite_count::number as favorite_count, record:quote_count::number as quote_count, record:reply_count::number as reply_count, record:entities.hashtags::varchar as hashtags, record:text::varchar as text, record:source::varchar as source, record:user.screen_name::varchar as user_screen_name, record:user.name::varchar as user_name, record:user.location::varchar as user_location, record:user.description::varchar as user_description from Logstash_tweets ;
タスクの実行状況は以下SQL文で確認可能です。
SELECT * FROM TABLE(information_schema.task_history( scheduled_time_range_start=>dateadd('hour',-1,current_timestamp()), result_limit => 100, task_name=>'Parse_Logstash_tweets'));
PARSED_TWEETS
を確認するとパースされたデータが格納されています。
可視化する
分析の一番楽しいところ、可視化です。今回はデータ加工にAlteryxを、可視化にTableauを使用しました。
まずはAlteryxでツイート内容を形態素解析し、文章を単語に分解します。形態素解析のためのツールは以下エントリを参考にインストールしてください。
PARSED_TWEETS
テーブルにAlteryx Desktopで接続し、RecordIDツールでそれぞれのレコードに連番を振ったあと、Janome Tokenizerツールを実行すると単語に分解されます。今回はhyperファイルに出力してTableauで取り込みするようにしました。
実行すると以下のように単語に分解されています。
AlteryxからSnowflakeへの接続は以下エントリをご参照ください。
次にTableau DesktopでSnowflake上のPARSED_TWEETS
テーブルとAlteryxで生成したhyperファイルに接続します。可視化した内容は以下の通り。
- 形態素解析した結果を使ってワードクラウドを生成
- どのユーザーが「クラスメソッド」「クラメソ」を含むツイートをしているのかカウントする
- 日別でツイートをカウントする
今回はTableau Desktopで作成しましたが、データソースをライブ抽出に設定してTableau Server/Onlineに出力すればデータが更新されるたびにダッシュボードも更新されます。
まとめ
Snowflakeをフル活用してTwitterをエゴサーチした結果を分析に使うの巻でした。今回はLogstashを実行している時間も短く、取得できたツイートも少なかったのであんまり映える結果になりませんでしたが、おおむねニュースやプレスリリースがよくシェアされているようです。 今回は可視化するだけに留めていますが、Amazon Comprehendなどの感情分析にかけるのも面白そうですね。そんな内容も以下のウェビナーでやっていました。
この記事がどなたかのTwitter分析ライフのお役に立てば幸いです。
参考資料
記事中でURLを貼ったもの以外の参考資料は以下の通りです。